Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding timeAdd with timezone kernel #1700

Closed
wants to merge 18 commits into from

Conversation

thirtiseven
Copy link
Collaborator

@thirtiseven thirtiseven commented Jan 16, 2024

This PR adds a kernel for timeAdd with a non-utc timezone (non-DST).

In Spark, timeAdd will try to keep the original offset after adding, so we need a special handling if the results are in an overlap. Details in the code comments.

The integration tests from the Spark side are passed, I will add some more tests in jni later.

Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven thirtiseven self-assigned this Jan 16, 2024
@thirtiseven thirtiseven marked this pull request as ready for review January 16, 2024 11:02
@thirtiseven thirtiseven requested a review from res-life January 16, 2024 11:03
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general the code looks like it would work fine, but there are a lot of details and cleanup that should make it more robust and more maintainable.

cacheDatabase(); // lazy load the database
}
Integer tzIndex = instance.getZoneIDMap().get(currentTimeZone.normalized().toString());
Table transitions = instance.getTransitions();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could leak if an exception is thrown.

try (Table transitions = instance.getTransactions()) {
  return new ColumnVector(timeAddCS(...));
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

cacheDatabase(); // lazy load the database
}
Integer tzIndex = instance.getZoneIDMap().get(currentTimeZone.normalized().toString());
Table transitions = instance.getTransitions();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here we want transitions to be closed in all cases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -230,6 +269,11 @@ private void doLoadData() {
);
}
});
ZoneOffsetTransition last = transitions.get(transitions.size() - 1);
Copy link
Collaborator

@revans2 revans2 Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is this a bug fix for existing code or is it only needed for the new kernels being added? Could you add some comments here to explain what this is for and how it works? Just so we have requirements when we add in or extend other time zone transition rules.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upper_bound will return the end() iterator if the element to find is greater than any element in the container, so this was not a bug because we only use idx-1 when converting timezones. But it is required for timeAdd because the index adjustment after the second upper_bound.

I think it will be better to have such an endpoint so that each upper_bound returns an iterator pointing to a real transition.

Copy link
Collaborator

@res-life res-life Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it's OK to add a max item in the end.

upper_bound:

thrust::device_vector<int> input(5);
input[0] = 0;
input[1] = 2;
input[2] = 5;
input[3] = 7;
input[4] = 8;
thrust::upper_bound(thrust::device, input.begin(), input.end(), 0); // returns input.begin() + 1
thrust::upper_bound(thrust::device, input.begin(), input.end(), 1); // returns input.begin() + 1
thrust::upper_bound(thrust::device, input.begin(), input.end(), 2); // returns input.begin() + 2
thrust::upper_bound(thrust::device, input.begin(), input.end(), 3); // returns input.begin() + 2
thrust::upper_bound(thrust::device, input.begin(), input.end(), 8); // returns input.end()
thrust::upper_bound(thrust::device, input.begin(), input.end(), 9); // returns input.end()

Another option: use lower_bound instead.
then: it != end && *it = searched_item to find an item.
Then we can remove the min and max items. This will save GPU memory.

I propose to remove the min/max items and use lower_bound. Should be a follow-up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful when there is no item in the vector, end - 1 will point to a wrong address.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful when there is no item in the vector, end - 1 will point to a wrong address.

Yes, since there is a start point and an end point, I think this case will not happen.

@@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.jni;

import java.beans.Transient;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this change made?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.jni;

import java.beans.Transient;
import java.time.ZoneId;
import java.util.List;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect to add any tests for this code? Even just a few simple ones to show that we didn't break anything.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

static_cast<size_t>(list_size));

// step 1: Binary search on utc to find the correct offset to convert utc to local
auto const utc_it = thrust::upper_bound(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we do this some place else? Shouldn't transitioning between time zones like this be a static inline function? And if not can you add comments as to why this is different from GpuTimeZoneDB.fromUtcTimestampToTimestamp?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between this and convert_timestamp_tz_functor is that we want to keep the to_local_offset. I updated the comment.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think there is a enough code to make it common. If we take the time to make the proper abstractions on the CUDA side then adding DST support should have code changes restricted primarily to one common place. But I will leave that up to you and the rest of the people working on timezone support.

src/main/cpp/src/timezones.cu Outdated Show resolved Hide resolved
src/main/cpp/src/timezones.cu Outdated Show resolved Hide resolved
src/main/cpp/src/timezones.cu Outdated Show resolved Hide resolved
src/main/cpp/src/timezones.cu Outdated Show resolved Hide resolved
* @param mr Device memory resource used to allocate the returned timestamp column's memory
*/
std::unique_ptr<cudf::column> time_add(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: duration parameter has 2 types: numeric_scalar and column_view.
Extract a template for duration parameter to combine the two time_add functions to one function.

Signed-off-by: Haoyang Li <[email protected]>
@res-life
Copy link
Collaborator

Please check 2024 Copyrights.

src/main/cpp/src/GpuTimeZoneDBJni.cpp Show resolved Hide resolved
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2024, NVIDIA CORPORATION.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2023-2024?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be 2023-2024, not 2024.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven
Copy link
Collaborator Author

thirtiseven commented Jan 19, 2024

There is a bug in the current code:
In spark, TimeAdd is a plusDays followed by a plus(microseconds):

def timestampAddDayTime(micros: Long, dayTime: Long, zoneId: ZoneId): Long = {
  val days = dayTime / MICROS_PER_DAY
  val microseconds = dayTime - days * MICROS_PER_DAY
  val resultTimestamp = microsToInstant(micros)
    .atZone(zoneId)
    .plusDays(days)
    .plus(microseconds, ChronoUnit.MICROS)
  instantToMicros(resultTimestamp.toInstant)
}

which results in:

  • plusDays happens in LocalTime, and plusDays is not equivalent to plus(MICROS_PER_DAY * days), so this PR is wrong here. I'm implementing this.
  • plusDays in ZonedDateTime will also resolveLocal (choose a preferred offset from two possible when in a overlap), so there will be two resolveLocals in total, which might cause some unexpected behavior, I will investigate it more.

src/main/cpp/src/timezones.cu Outdated Show resolved Hide resolved
static_cast<size_t>(list_size));

// step 1: Binary search on utc to find the correct offset to convert utc to local
auto const utc_it = thrust::upper_bound(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think there is a enough code to make it common. If we take the time to make the proper abstractions on the CUDA side then adding DST support should have code changes restricted primarily to one common place. But I will leave that up to you and the rest of the people working on timezone support.

src/main/cpp/src/timezones.cu Outdated Show resolved Hide resolved
jint tz_index)
{
JNI_NULL_CHECK(env, input_handle, "column is null", 0);
JNI_NULL_CHECK(env, transitions_handle, "column is null", 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a null check on the duration handle too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

try {
cudf::jni::auto_set_device(env);
auto const input = reinterpret_cast<cudf::column_view const*>(input_handle);
auto const duration =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is problematic because c++ is going to say that duration_handle is a pointer to cudf::duration_scalar<cudf::duration_us>, even if it is not. The only thing that is guaranteed is.

    auto duration = reinterpret_cast<cudf::scalar *>(duration_handle);

Beyond that you will need to use dynamic_cast where C++ will verify that it is the type you expect. Note that dynamic_cast returns nullptr if it is not the right type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I moved the dynamic_cast and type checking to timezones.cu L392-L386 because scalar is forward declaration here. I'm not quite sure I did things right there, please help check again.

src/main/cpp/src/timezones.cu Show resolved Hide resolved
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks a lot better. The JNI casts for scalar values need to be fixed. I also still think that there is a lot of code that could be made more common, but I am not going to keep pushing for that. It is a not now.

I also want to understand a long term plan for how we want to make the gap processing common. I think there is a lot of overlap between parsing timestamps and adding time intervals. If you think about it.

Add Interval looks kind of like.

Take timestamp and convert to local timeZone.
extract year, month, day, hour, min, second, sub-second
Add Intervals
Convert from TZ to UTC timestamp minding any gap.

Parsing a timestamp ends up being:
Parse out year, month, day, hour, min, second, sub-second and TZ from string
Convert from TZ to UTC timestamp minding any gap.

But that is all hidden details and we don't have to do it all now. My biggest concern is that each of the PRs are adding in different ways to find the gap, and I want to be sure that we don't have too many conflicts when merging them both.

// equivalent to add the remainder to the utc result without resolving the offset.
auto const duration_value =
static_cast<int64_t>(cuda::std::chrono::duration_cast<cudf::duration_us>(duration).count());
auto const microseconds_per_day = 86400000000ll;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we tested with more microseconds than in a day, along with a days value? I know that this is an odd corner case, but if we have to apply days separately from microseconds wouldn't this result in the same kind of errors if we assume that we can always convert microseconds_per_day into days? It looks like for DayTimeIntervalType microsecond values outside of a day are not technically allowed, so we are probably okay there, but CalendarIntervalType does not indicate any limitations like that, and reading the code there are none.

We might be able to work around this by restricting the usage of CalendarInterval, but I'd prefer not to need to do that.

Copy link
Collaborator Author

@thirtiseven thirtiseven Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following test can pass with this pr:

@pytest.mark.parametrize('data_gen', [(0, 86400000001), (1, 86400000001), (1, 86400000000*100)], ids=idfn)
@allow_non_gpu(*non_supported_tz_allow)
def test_timeadd_debug(data_gen):
    days, microseconds = data_gen
    assert_gpu_and_cpu_are_equal_collect(
        lambda spark: unary_op_df(spark, TimestampGen(), length=200000)
            .selectExpr("a + (interval {} days {} microseconds)".format(days, microseconds)))

It's an interesting case I hadn't thought of, I'll check the code to see if there is some validation in spark/pyspark to make this test pass.

If it is possible to pass an invalid CalendarInterval, we can have a simple workaround in plugin side to pass only the days to this kernel and then add the microseconds directly to the result.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this case failed on 311. Spark seems to automatically use DayTimeIntervalType for interval literals in higher versions. Will match this behavior in plugin PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matched this in plugin datetimeExpressionsUtils.scala

}
}

private static void warnUnsupportedTimeZone(ZoneId zoneId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is not a warning, could we call it something like assertTimeZoneSupported?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@thirtiseven
Copy link
Collaborator Author

@revans2 Thanks for the review. I will try to make the code more common and investigate the corner case, and I think other comments are addressed.
The common parts between this pr and #1539 is the GpuTimeZoneDB.java, there will be some conflicts but we can handle that because 1539 mainly works on the new column it added.

Signed-off-by: Haoyang Li <[email protected]>
@res-life
Copy link
Collaborator

res-life commented Jan 25, 2024

I also want to understand a long term plan for how we want to make the gap processing common. I think there is a lot of overlap between parsing timestamps and adding time intervals. If you think about it.

Add Interval looks kind of like.

Take timestamp and convert to local timeZone. extract year, month, day, hour, min, second, sub-second Add Intervals Convert from TZ to UTC timestamp minding any gap.

Parsing a timestamp ends up being: Parse out year, month, day, hour, min, second, sub-second and TZ from string Convert from TZ to UTC timestamp minding any gap.

But that is all hidden details and we don't have to do it all now. My biggest concern is that each of the PRs are adding in different ways to find the gap, and I want to be sure that we don't have too many conflicts when merging them both.

Refer to Java ZoneRules, there are mainly 3 columns:

    private final long[] savingsInstantTransitions;   // save n epoch seconds
    private final LocalDateTime[] savingsLocalTransitions; // save 2*n local times
    private final ZoneOffset[] wallOffsets;  // offsets

    for (ZoneOffsetTransition trans : transitionList) {
            if (trans.isGap()) {
                localTransitionList.add(trans.getDateTimeBefore());
                localTransitionList.add(trans.getDateTimeAfter());
            } else {
                localTransitionList.add(trans.getDateTimeAfter());
                localTransitionList.add(trans.getDateTimeBefore());
            }
            localTransitionOffsetList.add(trans.getOffsetAfter());
        }

In #1539, there are already 4 columns.
I propose to use the method of Java, then implements:

private Object getOffsetInfo(LocalDateTime dt) // used for local to UTC
public ZoneOffset getOffset(Instant instant) // used for UTC to local

savingsLocalTransitions savles 2*n items(n is fixed transition number), then binary search will add one extra match compared to our currently solution, our current solution search n items.

For savingsLocalTransitions, we can map them and save as epoch seconds.

Then, we will update timeAdd, timezones.cu and parser ts
other:
Remove the first min and the last max transitions to save GPU memory, and use lower_bound instead of upper_bound to do binary search.

For DST, mainly about:

  • Generate a new Transition rule if year is larger than the last transition.
  • Find the time on specified week day.

We can reuse the code for DST.

@thirtiseven
Copy link
Collaborator Author

build

@thirtiseven thirtiseven changed the base branch from branch-24.02 to branch-24.04 January 27, 2024 05:15
@thirtiseven thirtiseven marked this pull request as draft February 2, 2024 05:51
@thirtiseven
Copy link
Collaborator Author

Mark as draft because we need a long term plan for timezone issues.

@thirtiseven
Copy link
Collaborator Author

Close for not planned recently

@thirtiseven thirtiseven closed this Apr 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants